{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 逻辑回归\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression\n",
    " \n",
    "# Load training data\n",
    "training = spark.read.format(\"libsvm\").load(\"data/mllib/sample_libsvm_data.txt\")\n",
    " \n",
    "lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)\n",
    " \n",
    "# Fit the model\n",
    "lrModel = lr.fit(training)\n",
    " \n",
    "# Print the coefficients and intercept for logistic regression\n",
    "print(\"Coefficients: \" + str(lrModel.coefficients))\n",
    "print(\"Intercept: \" + str(lrModel.intercept))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 梯度迭代树（GBDT）"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "from pyspark.ml.classification import GBTClassifier\n",
    "from pyspark.ml.feature import StringIndexer, VectorIndexer\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    " \n",
    "# Load and parse the data file, converting it to a DataFrame.\n",
    "data = spark.read.format(\"libsvm\").load(\"data/mllib/sample_libsvm_data.txt\")\n",
    " \n",
    "# Index labels, adding metadata to the label column.\n",
    "# Fit on whole dataset to include all labels in index.\n",
    "labelIndexer = StringIndexer(inputCol=\"label\", outputCol=\"indexedLabel\").fit(data)\n",
    "# Automatically identify categorical features, and index them.\n",
    "# Set maxCategories so features with > 4 distinct values are treated as continuous.\n",
    "featureIndexer =\\\n",
    "    VectorIndexer(inputCol=\"features\", outputCol=\"indexedFeatures\", maxCategories=4).fit(data)\n",
    " \n",
    "# Split the data into training and test sets (30% held out for testing)\n",
    "(trainingData, testData) = data.randomSplit([0.7, 0.3])\n",
    " \n",
    "# Train a GBT model.\n",
    "gbt = GBTClassifier(labelCol=\"indexedLabel\", featuresCol=\"indexedFeatures\", maxIter=10)\n",
    " \n",
    "# Chain indexers and GBT in a Pipeline\n",
    "pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])\n",
    " \n",
    "# Train model.  This also runs the indexers.\n",
    "model = pipeline.fit(trainingData)\n",
    " \n",
    "# Make predictions.\n",
    "predictions = model.transform(testData)\n",
    " \n",
    "# Select example rows to display.\n",
    "predictions.select(\"prediction\", \"indexedLabel\", \"features\").show(5)\n",
    " \n",
    "# Select (prediction, true label) and compute test error\n",
    "evaluator = MulticlassClassificationEvaluator(\n",
    "    labelCol=\"indexedLabel\", predictionCol=\"prediction\", metricName=\"accuracy\")\n",
    "accuracy = evaluator.evaluate(predictions)\n",
    "print(\"Test Error = %g\" % (1.0 - accuracy))\n",
    " \n",
    "gbtModel = model.stages[2]\n",
    "print(gbtModel)  # summary only"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 多层感知机（MLP）"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import MultilayerPerceptronClassifier\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    " \n",
    "# Load training data\n",
    "data = spark.read.format(\"libsvm\")\\\n",
    "    .load(\"data/mllib/sample_multiclass_classification_data.txt\")\n",
    "# Split the data into train and test\n",
    "splits = data.randomSplit([0.6, 0.4], 1234)\n",
    "train = splits[0]\n",
    "test = splits[1]\n",
    "# specify layers for the neural network:\n",
    "# input layer of size 4 (features), two intermediate of size 5 and 4\n",
    "# and output of size 3 (classes)\n",
    "layers = [4, 5, 4, 3]\n",
    "# create the trainer and set its parameters\n",
    "trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)\n",
    "# train the model\n",
    "model = trainer.fit(train)\n",
    "# compute accuracy on the test set\n",
    "result = model.transform(test)\n",
    "predictionAndLabels = result.select(\"prediction\", \"label\")\n",
    "evaluator = MulticlassClassificationEvaluator(metricName=\"accuracy\")\n",
    "print(\"Accuracy: \" + str(evaluator.evaluate(predictionAndLabels)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## One-vs-Rest\n",
    "\n",
    "OneVsRest将一个给定的二分类算法有效地扩展到多分类问题应用中，也叫做“One-vs-All.”算法。OneVsRest是一个Estimator。它采用一个基础的Classifier然后对于k个类别分别创建二分类问题。类别i的二分类分类器用来预测类别为i还是不为i，即将i类和其他类别区分开来。最后，通过依次对k个二分类分类器进行评估，取置信最高的分类器的标签作为i类别的标签。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import LogisticRegression, OneVsRest\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    " \n",
    "# load data file.\n",
    "inputData = spark.read.format(\"libsvm\") \\\n",
    "    .load(\"data/mllib/sample_multiclass_classification_data.txt\")\n",
    " \n",
    "# generate the train/test split.\n",
    "(train, test) = inputData.randomSplit([0.8, 0.2])\n",
    " \n",
    "# instantiate the base classifier.\n",
    "lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)\n",
    " \n",
    "# instantiate the One Vs Rest Classifier.\n",
    "ovr = OneVsRest(classifier=lr)\n",
    " \n",
    "# train the multiclass model.\n",
    "ovrModel = ovr.fit(train)\n",
    " \n",
    "# score the model on test data.\n",
    "predictions = ovrModel.transform(test)\n",
    " \n",
    "# obtain evaluator.\n",
    "evaluator = MulticlassClassificationEvaluator(metricName=\"accuracy\")\n",
    " \n",
    "# compute the classification error on test data.\n",
    "accuracy = evaluator.evaluate(predictions)\n",
    "print(\"Test Error : \" + str(1 - accuracy))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 朴素贝叶斯算法"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.classification import NaiveBayes\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
    " \n",
    "# Load training data\n",
    "data = spark.read.format(\"libsvm\") \\\n",
    "    .load(\"data/mllib/sample_libsvm_data.txt\")\n",
    "# Split the data into train and test\n",
    "splits = data.randomSplit([0.6, 0.4], 1234)\n",
    "train = splits[0]\n",
    "test = splits[1]\n",
    " \n",
    "# create the trainer and set its parameters\n",
    "nb = NaiveBayes(smoothing=1.0, modelType=\"multinomial\")\n",
    " \n",
    "# train the model\n",
    "model = nb.fit(train)\n",
    "# compute accuracy on the test set\n",
    "result = model.transform(test)\n",
    "predictionAndLabels = result.select(\"prediction\", \"label\")\n",
    "evaluator = MulticlassClassificationEvaluator(metricName=\"accuracy\")\n",
    "print(\"Accuracy: \" + str(evaluator.evaluate(predictionAndLabels)))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
